-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35157][runtime] Sources with watermark alignment get stuck once some subtasks finish #24757
Conversation
hi, @1996fanrui would you mind reviewing this for me when you have a moment? Thank you very much! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @elon-X , I'm wondering if "NoMoreSplitsEvent" would be appropriate to check if a subtask is finished.
The Split can be discovered dynamically. IIRC, kafka source subtask won't be FINISHED even if it doesn't have split for Flink Streaming Job. (This subtask may be assigned split after adding new kafka partition)
I prefer Gyula proposed solution: The solution could be to send out a max watermark event once the sources finish or to exclude them from the source coordinator
.
We need to find a proper code place(source is definitely finished) to send MaxTimestamp.
I guess
Line 452 in 046872c
return DataInputStatus.END_OF_INPUT; |
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
Show resolved
Hide resolved
.../apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java
Outdated
Show resolved
Hide resolved
.../apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java
Outdated
Show resolved
Hide resolved
.../apache/flink/test/streaming/api/datastream/SubTaskFinishedWithWatermarkAlignmentITCase.java
Outdated
Show resolved
Hide resolved
@1996fanrui I've made some changes based on your suggestions. Please review the changes when you have a chance and let me know if there are any further improvements needed. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @elon-X , thanks for the update!
The production code looks good to me, I left some comments for test code, please take a look in your free time, thanks~
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
Outdated
Show resolved
Hide resolved
...s/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
Outdated
Show resolved
Hide resolved
...s/src/test/java/org/apache/flink/test/streaming/api/datastream/WatermarkAlignmentITCase.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorAlignmentTest.java
Outdated
Show resolved
Hide resolved
@@ -377,4 +424,9 @@ private void assertLatestWatermarkAlignmentEvent(int subtask, long expectedWater | |||
assertThat(events.get(events.size() - 1)) | |||
.isEqualTo(new WatermarkAlignmentEvent(expectedWatermark)); | |||
} | |||
|
|||
private void assertHasNoMoreSplits(int subtask, boolean expected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method name is assertHasNoMoreSplits
, but it triggers signalNoMoreSplits
as well.
I think sourceCoordinator.getContext().signalNoMoreSplits(subtask);
should be moved out of this method.
Also, assertHasNoMoreSplits
is only called once, and the method only has one line. I'm not sure whether we need extract it as a separate method. Caller can call assertThat(sourceCoordinator.getContext().hasNoMoreSplits(subtask)).isEqualTo(expected);
directly.
assertLatestWatermarkAlignmentEvent(subtask1, 1042); | ||
|
||
// mock noMoreSplits event | ||
assertHasNoMoreSplits(subtask0, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what testWatermarkAlignmentWhileSubtaskFinished
wants to test?
IIUC, the test still works even if we don't call assertHasNoMoreSplits(subtask0, true);
(remove this line) here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention is to add a test case to simulate the sending of the noMoreSplits event, indicating that the SourceCoordinator
is working correctly. If we have WatermarkAlignmentITCase
, perhaps we can remove testWatermarkAlignmentWhileSubtaskFinished
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention is to add a test case to simulate the sending of the noMoreSplits event
Actually, this test is testing the watermark result when the watermark of one subtask is Long.Max, right?
I think this test can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks for your patience.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @elon-X for the update!
LGTM assuming the CI is green.
@elon-X The CI fails, could you rebase the master branch first? We can follow the CI after rebaseing. |
ad0d3c8
to
8ac3d8a
Compare
…e some subtasks finish (apache#24757)
…e some subtasks finish (apache#24757)
…e some subtasks finish (apache#24757)
…e some subtasks finish (#24757)
…e some subtasks finish (#24757)
…e some subtasks finish (#24757)
…e some subtasks finish (apache#24757)
What is the purpose of the change
Sources with watermark alignment get stuck once some subtasks finish, this PR solves this problem.
Brief change log
while some subtasks have been finished, the SourceOperator send Long.MAX_VALUE to SourceCoordinator, and SourceCoordinator checks whether subtasks have been finished before sending the event.
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation